MapReduce源码分析:
建议:写一个简单的MapReduce程序,使用Debug进行跟着源码一步一步的查看
入口点:boolean flag = job.waitForCompletion(true);
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public boolean waitForCompletion { 主要调用了submit方法 submit(){ setUseNewAPI() connect(); public JobStatus run() { return submitter.submitJobInternal(Job.this, cluster){ checkSpecs(job) addMRFrameworkToDistributedCache(conf) int maps = writeSplits(job, submitJobDir){ writeNewSplits(job, jobSubmitDir){ List<InputSplit> splits = input.getSplits(job){ } } } } } }
|
获得切片的方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public List<InputSplit> getSplits(JobContext job) throws IOException { long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); long maxSize = getMaxSplitSize(job); long blockSize = file.getBlockSize(); long splitSize = computeSplitSize(blockSize, minSize, maxSize);
long bytesRemaining = length; while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); bytesRemaining -= splitSize; } return splits; }
|
1 2 3
| protected long getFormatMinSplitSize() { return 1; }
|
1
| private static final double SPLIT_SLOP = 1.1;
|
1 2 3
| public static long getMinSplitSize(JobContext job) { return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L); }
|
1 2 3 4
| public static long getMaxSplitSize(JobContext context) { return context.getConfiguration().getLong(SPLIT_MAXSIZE, Long.MAX_VALUE); }
|
1 2 3
| long blockSize = file.getBlockSize(){ return blocksize; }
|
// 最终计算分片的大小值
1 2 3 4
| long splitSize = computeSplitSize(blockSize, minSize, maxSize){ 1 Long.maxValue 128M return Math.max(minSize, Math.min(maxSize, blockSize)); }
|
注意:如果想要控制切片的大小,可以控制上面的 maxSize 的大小,当maxSize小于blockSize时候控制maxSize的大小可以
改变切片的大小,当maxSize的大小大于blockSize的大小的时候,切片的大小就是blockSize 的大小
切片是在代码级别对文件进行的切片,并不是完全按照blocksize大小
blocksize是指概念上对HDFS上的文件进行切分